iT邦幫忙

2021 iThome 鐵人賽

DAY 22
1

在flow那篇我們了解到flow的特性,尤其是每次collect都會創建新的實例,但在某些use case卻不適用,而kotlin為此推出了shareflow和stateflow

可以先了解到stateflow是繼承自shareflow的,所以我會先講shareflow,再聊到stateflow

相對於flow,shareFlow和stateFlow都是hot flow,意即flow需要有人collect才會執行且每次都會建立新的實例,而shareflow不同,它可以讓多追蹤者接收同一則訊息
A SharedFlow that represents a read-only state with a single updatable data value that emits updates to the value to its collectors.

定義

A hotFlow that shares emitted values among all its collectors in a broadcast fashion, so that all collectors get all emitted values. A shared flow is called hot because its active instance exists independently of the presence of collectors.

首先,SharedFlow 可以同時有多個 collecter,在 SharedFlow /StateFlow中 collecter 稱為 subscriber(訂閱者)。而所有的訂閱者可以追蹤同一個實例,如此一來,他們都可以收到值。

shareflow作為熱流,會向所有訂閱者發送值,讓全部的訂閱者能收到全部的值,而他作為熱流的特性在於,他可以獨立於collector存在

正文

要建立shareflow有兩種方式

private val _mShareFlow = MutableSharedFlow<Int>()
val mShareFlow = _mShareFlow.asSharedFlow()

//or
val extensionShareFlow = flow{
    emit(1)
}.shareIn(viewModelScope, SharingStarted.WhileSubscribed(),replay = 0)

可以注意到,在shareIn多幾個值要設定,帶他其實就是在裡面實作MutableSharedFlow

public fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow<T> {
    val config = configureSharing(replay)
    val shared = MutableSharedFlow<T>(
        replay = replay,
        extraBufferCapacity = config.extraBufferCapacity,
        onBufferOverflow = config.onBufferOverflow
    )
    @Suppress("UNCHECKED_CAST")
    val job = scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T)
    return ReadonlySharedFlow(shared, job)
}

那我們來講講那些東西到底什麼意思

replay

A shared flow keeps a specific number of the most recent values in its replay cache.
shareflow可以設置回撥快取,每個追蹤者會先拿到快取資訊,再拿到最新的值
我們可以透過建構子的replay設置replay cahce max size

private val _dsd = MutableSharedFlow<Int>(replay = 100)
val dsd = _dsd.asSharedFlow()

我們同樣也可以透過replayCache拿到整個回撥快取資料

_dsd.replayCache // return List<Int>

replayCache:List< T> ,當有一個新的 collector 加入時,就會根據設定的 replay 數量來把最後的項目廣播給新的 collector 上。

也能夠清空回撥快取

_dsd.resetReplayCache()

scope

這邊是帶入我們在這個類別中所建立的 CoroutineScope,目的是用來定義共享開始的 Coroutine Scope。

started 開始

started 則是用來決定 SharedFlow 什麼時候會開始啟動。而在這邊有三個選項可供選擇

  • Eagerly:當 SharedFlow 建立起來之後,就會立刻起動,而且永不停止。
  • Lazily:等到第一個追蹤者加入的時候,才會啟動,同樣也是永不停止。
  • WhileSubscribed:預設是當第一個訂追蹤者加入的時候就會啟動,當最後一個追蹤者取消的時候就會停止。所以與前面兩個的第一個不同點就是它會停止。

假設需求總是監聽位置更新並在應用程序來自後台時在屏幕上顯示最後 10 個位置,我們使用replay值 10 將最後 10 個發出的項目保留在內存中,並在每次收集器觀察流時重新發出這些項目。為了保持底層流始終處於活動狀態並發出位置更新,SharingStarted.Eagerly即使沒有收集器,也可以使用策略來監聽更新。

WhileSubscribed參數

  • stopTimeoutMillis:沒有追蹤者時,多久後停止

  • replayExpirationMillis:幾秒後reset replay

最開始的時候講了,shareflow會一直獨立於訂閱者存在,但我們能夠透過WhileSubscribed讓shareflow在沒有訂閱者的時候停止,但問題來了,如果我們在fragment追蹤,螢幕旋轉了怎麼處理?

根據官方博文,他們建議以WhileSubscribed(5000L),讓shareflow變成在沒有訂閱者的5秒才停止

extraBufferCapacity

A replay cache also provides buffer for emissions to the shared flow, allowing slow subscribers to get values from the buffer without suspending emitters.
The buffer space determines how much slow subscribers can lag from the fast ones. When creating a shared flow, additional buffer capacity beyond replay can be reserved using the extraBufferCapacity parameter.

回播快取也發射提供緩衝區,讓慢速訂閱者可以從buffer換取資料,而不必讓發送區掛起

緩衝區會決定慢速訂閱者和快速訂閱者的差距,建立shareflow時,超過replay的資料可以用buffer緩衝。

BufferOverflow

A shared flow with a buffer can be configured to avoid suspension of emitters on buffer overflow using the onBufferOverflow parameter, which is equal to one of the entries of the BufferOverflow enum. When a strategy other than SUSPENDED is configured, emissions to the shared flow never suspend.

有緩存的shaerflow可以設置是否要再緩存區溢出時suspend,當設置了SUSPEND以外的列舉時,緩存溢出將不會掛起

SUSPEND - the upstream that is sending or is emitting a value is suspended while the buffer is full. 當有個訂閱者會執行耗時任務,且再緩存滿了的時候還在執行,這時上流的emit會被掛起,直到他準備好接收新值。

DROP_OLDEST - 丟棄最舊的值

DROP_LATEST- 丟棄最新的值
https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-buffer-overflow/index.html

緩存溢出僅會在至少一個追蹤者還沒準備好接收最新值的時候溢出,如果沒有追蹤者,僅會REPLAY存儲最新的值,並且永遠不會發生緩存溢出,本質上在沒有追蹤者的情況下,行為類似於DROP.OLDEST, but the buffer is just of replay size (without any extraBufferCapacity).

緩衝和快取行為


SharedFlow 可以同時將快取內的資料同時傳送給所有 collect,
我們可以根據需求設定快取的大小,當有新的訂閱者加入時,就會把最後幾筆資料傳送給訂閱者,而當更新的資料傳送完畢之後,就會跟其他的訂閱者一起接收新的資料。

//VIEWMDOEL
private val _mShareFlow = MutableSharedFlow<Int>(
    replay = 2,
    extraBufferCapacity = 1,
    onBufferOverflow = BufferOverflow.DROP_OLDEST
)
val mShareFlow = _mShareFlow.asSharedFlow()

init{
    viewModelScope.launch {
        var count = 1
        while(true){
            _mShareFlow.emit(count)
            count++
            delay(1000L)
        }

    }
}
lifecycleScope.launch {
    launch  (Dispatchers.IO){
        repeatOnLifecycle(Lifecycle.State.STARTED){
            viewModel.mShareFlow.collect {
                Timber.d("FIRST collect $it")
            }
        }
    }
    delay(5000L)
    launch (Dispatchers.IO){
        repeatOnLifecycle(Lifecycle.State.STARTED){
            viewModel.mShareFlow.collect {
                Timber.d("SECOND collect $it")
            }
        }
    }

}

上面用看的應該是看不懂,我寫了個範例

FIRST collect 1
FIRST collect 2
FIRST collect 3
FIRST collect 4
FIRST collect 5
SECOND collect 4//只有REPLAY的值
SECOND collect 5
FIRST collect 6
SECOND collect 6
FIRST collect 7
SECOND collect 7
lifecycleScope.launch {
    launch  (Dispatchers.IO){
        repeatOnLifecycle(Lifecycle.State.STARTED){
            viewModel.mShareFlow.collect {
                Timber.d("FIRST collect $it")
            }
        }
    }
    launch {
        repeatOnLifecycle(Lifecycle.State.STARTED){
            viewModel.mShareFlow.collect {
                delay(5000L)
                Timber.d("STUCK collect $it")
            }
        }
    }

}
FIRST collect 1
FIRST collect 2
FIRST collect 3
FIRST collect 4
FIRST collect 5
STUCK collect 1//collect收到1才delay
FIRST collect 6
FIRST collect 7
FIRST collect 8
FIRST collect 9
FIRST collect 10
STUCK collect 3//collect印出1之後,收到extrabuffer的3再delay,此時extrabuffer 3 replay 4 5 emit 6
FIRST collect 11
FIRST collect 12
FIRST collect 13
FIRST collect 14
FIRST collect 15
STUCK collect 8//同上emit 11

連結

Kotlin github io shareflow

jast

should-we-choose-kotlins-stateflow-or-sharedflow-to-substitute-for-android-s-livedata


上一篇
day21 開分支,淺談kotlin paging3 with flow
下一篇
day23 stateFlow狀態流,又是沒梗的一天
系列文
解鎖kotlin coroutine的各種姿勢-新手篇30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言